#include "config.h"

#include <time.h>
#include <sys/mman.h>
#include <libaio.h>
#include <sys/types.h>
#include <utime.h>
#include <errno.h>
#include <time.h>

#define DBG_SUBSYS S_LIBREPLICA

#include "replica.h"
#include "job_dock.h"
#include "sysy_lib.h"
#include "core.h"
#include "cluster.h"
#include "disk.h"
#include "../storage/md_parent.h"
#include "lich_md.h"
#include "squeue.h"
#include "disk_maping.h"
#include "timer.h"
#include "net_global.h"
#include "bmap.h"
#include "sequence.h"
#include "disk_redis.h"
#include "dbg.h"
#include "node.h"
#include "diskmd.h"

typedef replica_srv_entry_t entry_t;

void replica_srv_preload();

struct {
        sy_rwlock_t lock;
        vec_str_t array;
} pool_array;

// pool上chunk的缓存
static mcache_t *cache_pool;

// 卷上chunk的缓存
static mcache_t *cache_vol;

static int __drop(void *value, mcache_entry_t *cent, int recycle);
static int __replica_srv_get(mcache_t *cache, const chkid_t *chkid, mcache_entry_t **_cent);

replica_srv_t *replica_srv;

STATIC void __replica_srv_release(mcache_entry_t *cent)
{
        mcache_release(cent);
}

static int __cmp(const void *s1, const void *s2)
{
        const chkid_t *chkid = s1;
        const entry_t *ent = s2;

#if 0
        DBUG("cmp %llu_v%u : %llu_v%u\n",(LLU)chkid->id, chkid->version,
             (LLU)ent->chkid.id, ent->chkid.version);
#endif

        return !chkid_cmp(chkid, &ent->chkid);
}

/**
 * @todo str计算消耗过多cpu资源
 *
 * @param key
 * @return
 */
static uint32_t __hash(const void *key)
{
        const chkid_t *id = key;
        //char str[64];

        //snprintf(str, 64, "-x-%llu-c-%u-x-",  (LLU)id->id, id->idx);
        //return hash_str(str);
        return id->id * (1 + id->idx);
        //return (id->id + id->idx);
}

static uint32_t __core_hash(const void *key)
{
        const chkid_t *id = key;

        return core_hash(id);
}

static void __entry_free(entry_t *ent)
{
        YASSERT(list_empty(&ent->wq.wlist));

        DBUG("chunk "CHKID_FORMAT" free\n", CHKID_ARG(&ent->chkid));
        
#if ENABLE_CHUNK_PARALLEL
        YASSERT(ent->pio.writing == 0);
        if(ent->chkid.type != __RAW_CHUNK__)
                bits_lock_destory(&ent->pio.bits_lock);
#endif

        yfree1((void **)&ent);
}

/*
 *  Save the same pool name, To avoid the waste of memory
 */

static char *pool_save(const char *pool)
{
        int ret, i;
        char *_pool;

        YASSERT(pool);

        ret = sy_rwlock_rdlock(&pool_array.lock);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        vec_foreach(&pool_array.array, _pool, i) {
                if (0 == strcmp(_pool, pool)) {
                        sy_rwlock_unlock(&pool_array.lock);
                        goto out;
                }
        }

        sy_rwlock_unlock(&pool_array.lock);

        DINFO("pool %s not cached! vec length:%d\n", pool, pool_array.array.length);

        ret = ymalloc((void **)&_pool, MAX_NAME_LEN);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        strcpy(_pool, pool);

        ret = sy_rwlock_wrlock(&pool_array.lock);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        ret = vec_push(&pool_array.array, _pool);
        if (unlikely(ret)) {
                GOTO(err_lock, ret);
        }

        sy_rwlock_unlock(&pool_array.lock);

out:
        return _pool;
err_lock:
        sy_rwlock_unlock(&pool_array.lock);
err_ret:
        UNIMPLEMENTED(__DUMP__);
        return NULL;
}

static int __entry_init(entry_t **_ent, const chkid_t *chkid, const vclock_t *vclock,
                        const char *pool, const diskloc_t *loc, const chkid_t *parent)
{
        int ret;
        entry_t *ent;

        YASSERT(chkid->id);

        ret = ymalloc1((void **)&ent, sizeof(*ent));
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ent->chkid = *chkid;
        ent->vclock = *vclock;
        ent->pool = pool_save(pool);
        ent->loc = *loc;
        ent->parent = *parent;

#if ENABLE_CHUNK_PARALLEL
        ent->pio.writing = 0;

        ret = sy_spin_init(&ent->pio.lock);
        if (unlikely(ret))
                UNIMPLEMENTED(__DUMP__);

        if(unlikely((chkid->type != __RAW_CHUNK__)
                    || (gloconf.read_modify_write))) {

                ret = bits_lock_create(&ent->pio.bits_lock, LICH_CHUNK_SPLIT / REPLICA_DISK_MD_PAGE_SIZE / 8);
                if (unlikely(ret))
                        GOTO(err_free, ret);
        } else {
                ent->pio.bits_lock = NULL;
        }
#endif
        memset(&ent->owner, 0x0, sizeof(ent->owner));

        ret = sy_spin_init(&ent->wq.lock);
        if (unlikely(ret))
                UNIMPLEMENTED(__DUMP__);
        
        INIT_LIST_HEAD(&ent->wq.wlist);

        *_ent = ent;

        return 0;
#if ENABLE_CHUNK_PARALLEL
err_free:
        yfree1((void **)&ent);
#endif
err_ret:
        return ret;
}

STATIC int __replica_srv_get_clock__(const chkid_t *chkid, clockstat_t *clockstat)
{
        int ret, dirty = 0;

        ret = clock_get(chkid, &clockstat->vclock, &dirty);
        if (unlikely(ret)) {
                if (ret == ENOENT) {
                        memset(clockstat, 0x0, sizeof(*clockstat));
                        clockstat->lost = 1;
                } else
                        GOTO(err_ret, ret);
        } else {
                clockstat->dirty = dirty;
                clockstat->lost = 0;
        }

        return 0;
err_ret:
        return ret;
}

static int __entry_load(const chkid_t *chkid, entry_t **_ent)
{
        int ret;
        entry_t *ent;
        chkid_t parent;
        diskloc_t loc;
        clockstat_t clockstat;
        char pool[MAX_NAME_LEN];

        YASSERT(!chkid_isnull(chkid));

        ret = diskmd_chunk_load(chkid, &loc, &parent, pool);
        if (unlikely(ret)) {
                DBUG("chunk "CHKID_FORMAT"\n", CHKID_ARG(chkid));
                if (ret == ENOENT)
                        goto err_ret;
                else
                        GOTO(err_ret, ret);
        }

        ret = __replica_srv_get_clock__(chkid, &clockstat);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

#if ENABLE_CHUNK_DEBUG
        DINFO("chunk "CHKID_FORMAT" "VCLOCK_FORMAT",%u,%u, parent "CHKID_FORMAT"\n",
              CHKID_ARG(chkid), VCLOCK_ARG(&clockstat.vclock), clockstat.dirty,
              clockstat.lost, CHKID_ARG(&parent));
#else
        DBUG("chunk "CHKID_FORMAT" "VCLOCK_FORMAT",%u,%u, parent "CHKID_FORMAT"\n",
              CHKID_ARG(chkid), VCLOCK_ARG(&clockstat.vclock), clockstat.dirty,
              clockstat.lost, CHKID_ARG(&parent));
#endif

        ret = __entry_init(&ent, chkid, &clockstat.vclock, pool, &loc, &parent);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        *_ent = ent;

        return 0;
err_ret:
        return ret;
}

static int __entry_create(const char *pool, const chkid_t *chkid, const diskloc_t *loc,
                          const chkid_t *parent, entry_t **_ent)
{
        int ret;
        entry_t *ent;
        vclock_t vclock;

        memset(&vclock, 0x0, sizeof(vclock));
        ret = __entry_init(&ent, chkid, &vclock, pool, loc, parent);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        *_ent = ent;

        return 0;
err_ret:
        return ret;
}

STATIC int __replica_srv_get(mcache_t *cache, const chkid_t *chkid, mcache_entry_t **_cent)
{
        int ret, retry = 0;
        entry_t *ent;
        mcache_entry_t *cent;

        //DBUG("get "CHKID_FORMAT"\n", CHKID_ARG(chkid));

retry:
        ret = mcache_get(cache, chkid, &cent);
        if (unlikely(ret)) {
                if (ret == ENOENT) {
                        YASSERT(retry < 10);

                        ret = mcache_insert_lock(cache, chkid, &cent);
                        if (unlikely(ret)) {
                                if (ret == EEXIST) {
                                        DINFO("insert "CHKID_FORMAT"\n", CHKID_ARG(chkid));
                                        retry++;
                                        goto retry;
                                } else
                                        GOTO(err_ret, ret);
                        }

                        ret = __entry_load(chkid, &ent);
                        if (unlikely(ret)) {
                                goto err_lock;
                        }

                        cent->value = ent;

                        mcache_insert_unlock(cache, chkid, cent);

                        DBUG("insert "CHKID_FORMAT"\n", CHKID_ARG(chkid));
                        retry++;
                        goto retry;
                } else
                        GOTO(err_ret, ret);
        }

        *_cent = cent;

        return 0;
#if 1
err_lock:
        mcache_remove_unlock(cent);
#endif
err_ret:
        return ret;
}

STATIC int __replica_srv_isempty(const chkid_t *chkid, int *empty)
{
        int ret;
        clockstat_t clockstat;

        *empty = 0;
        ret = __replica_srv_get_clock__(chkid, &clockstat);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        if (clockstat.vclock.clock == 0) {
                *empty = 1;
        } else {
#if ENABLE_CHUNK_DEBUG
                DINFO("chunk "CHKID_FORMAT" clock %ju empty %d\n", CHKID_ARG(chkid), clockstat.vclock.clock, *empty);
#else
                DBUG("chunk "CHKID_FORMAT" clock %ju empty %d\n", CHKID_ARG(chkid), clockstat.vclock.clock, *empty);
#endif
        }

        return 0;
err_ret:
        return ret;
}

STATIC int __replica_srv_create_lock__(const char *pool, const nid_t *owner,
                                       const chkid_t *chkid, const diskloc_t *loc, const chkid_t *parent,
                                       int initzero, const buffer_t *initdata,
                                       uint64_t fingerprint, mcache_entry_t **_cent)
{
        int ret;
        entry_t *ent;
        mcache_entry_t *cent;
        buffer_t *tmp;
        buffer_t align;

        (void) fingerprint;
        
        if (likely(gloconf.rdma || initdata == NULL || mbuffer_isalign(initdata))) {
                tmp = (buffer_t *)initdata;
        } else {
                mbuffer_clone(&align, initdata);
                tmp = &align;
        }

        ret = __entry_create(pool, chkid, loc, parent, &ent);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        DBUG("create "CHKID_FORMAT" owner %s\n", CHKID_ARG(&ent->chkid),
             network_rname(owner));

        if (chkid->type == __RAW_CHUNK__) {
                ent->owner = *owner;
        } else {
                memset(&ent->owner, 0x0, sizeof(ent->owner));
        }

        ret = diskmd_create_init(chkid, &ent->loc, initzero, tmp);
        if (unlikely(ret))
                GOTO(err_del, ret);

        if (likely(chkid_isvol(chkid))) {
                ret = mcache_insert_wrlock(cache_vol, chkid, ent, &cent);
                if (unlikely(ret)) {
                        GOTO(err_del, ret);
                }
        } else {
                ret = mcache_insert_wrlock(cache_pool, chkid, ent, &cent);
                if (unlikely(ret)) {
                        GOTO(err_del, ret);
                }
        }

        *_cent = cent;

        if (unlikely(tmp == &align))
                mbuffer_free(tmp);
        return 0;
err_del:
        //disk_unlink(chkid, &ent->loc, meta_version);
        yfree1((void **)&ent);
err_ret:
        if (unlikely(tmp == &align))
                mbuffer_free(tmp);
        return ret;
}

typedef struct {
        mcache_entry_t *cent;
        chkid_t chkid;
        diskloc_t loc;
} locked_t;

STATIC int __replica_srv_create_lock(const char *pool, const nid_t *owner,
                                     const chkid_t *chkids, const diskloc_t *locs, int chknum,
                                     locked_t *locked, int *_locked_count,
                                     const chkid_t *parent, int initzero,
                                     const buffer_t *initdata,
                                     uint64_t fingerprint)
{
        int ret, i, locked_count = 0;
        mcache_entry_t *cent;

        ANALYSIS_BEGIN(0);

        for (i = 0; i < chknum; i++) {
                ret = __replica_srv_create_lock__(pool, owner, &chkids[i], &locs[locked_count],
                                                  parent, initzero, initdata, fingerprint, &cent);
                if (unlikely(ret)) {
                        if (ret == EEXIST) {
                                DBUG("chunk "CHKID_FORMAT" exist, need recycle\n",
                                       CHKID_ARG(&chkids[i]));
                                continue;
                        } else
                                GOTO(err_ret, ret);
                }

                locked[locked_count].cent = cent;
                locked[locked_count].chkid = chkids[i];
                locked[locked_count].loc = locs[locked_count];
                locked_count++;

#if ENABLE_BMAP_DEBUG
                diskmd_exists(&locs[i], 1);
#endif

        }

        *_locked_count = locked_count;

        ANALYSIS_QUEUE(0, 1000 * 100, "replica_srv_create_lock");

        return 0;
err_ret:
        return ret;
}

STATIC void __replica_srv_create_unlock(locked_t *locked, int locked_count)
{
        int i;
        mcache_entry_t *cent;

        ANALYSIS_BEGIN(0);

        for (i = 0; i < locked_count; i++) {
                cent = locked[i].cent;

                mcache_unlock(cent);
                mcache_release(cent);
        }

        ANALYSIS_QUEUE(0, 1000 * 100, "replica_srv_create_unlock");
}


STATIC int __replica_srv_insert(const char *pool, const locked_t *locked, int locked_count,
                                const chkid_t *parent, int tier, uint64_t meta_version,
                                uint64_t fingerprint)
{
        int ret, i;
        chkid_t *chkids;
        diskloc_t *locs;
        vclock_t vclock;

        ANALYSIS_BEGIN(0);

        ret = ymalloc((void **)&chkids, sizeof(*chkids) * locked_count);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = ymalloc((void **)&locs, sizeof(*locs) * locked_count);
        if (unlikely(ret))
                UNIMPLEMENTED(__DUMP__);

        for (i = 0; i < locked_count; i++) {
                chkids[i] = locked[i].chkid;
                locs[i] = locked[i].loc;
        }

        ret = disk_maping->create(pool, chkids, locs, locked_count, parent,
                                  meta_version, O_CREAT | O_EXCL | O_SYNC);
        if (unlikely(ret))
                GOTO(err_free, ret);

        yfree((void **)&chkids);
        yfree((void **)&locs);

        memset(&vclock, 0x0, sizeof(vclock));
        for (i = 0; i < locked_count; i++) {
                ret = clock_set(&locked[i].chkid, &vclock, 0);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

        ANALYSIS_QUEUE(0, 1000 * 100, "replica_srv_insert");

        return 0;
err_free:
        yfree((void **)&chkids);
        yfree((void **)&locs);
err_ret:
        return ret;
}



STATIC int __replica_srv_create____(const char *pool, const nid_t *owner,
                                    const chkid_t *chkids, const diskloc_t *locs, int chknum,
                                    const chkid_t *parent, int tier, int initzero,
                                    const buffer_t *initdata, uint64_t meta_version,
                                    uint64_t fingerprint)
{
        int ret, locked_count, i;
        locked_t *locked;

        DBUG("create chunk count %u\n", chknum);

        ret = ymalloc((void **)&locked, sizeof(*locked) * chknum);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __replica_srv_create_lock(pool, owner, chkids, locs, chknum,
                                        locked, &locked_count, parent, initzero,
                                        initdata, fingerprint);
        if (unlikely(ret))
                GOTO(err_free, ret);

        if (locked_count) {
                ret = __replica_srv_insert(pool, locked, locked_count, parent, tier,
                                           meta_version, fingerprint);
                if (unlikely(ret))
                        GOTO(err_lock, ret);
        }

        __replica_srv_create_unlock(locked, locked_count);

        for (i = locked_count; i < chknum; i++) {
                //DERROR("%u chunk need recycle\n", chknum - locked_count);
                DINFO("delete %d loc "LOC_FORMAT"\n", i, LOC_ARG(&locs[i]))
                diskmd_delete(&locs[i]);
        }

        yfree((void **)&locked);

        return 0;
err_lock:
        __replica_srv_create_unlock(locked, locked_count);
err_free:
        yfree((void **)&locked);
err_ret:
        return ret;
}

STATIC int __replica_srv_create__(const char *pool, const nid_t *owner,
                                  const chkid_t *chkids, int chknum, const chkid_t *parent,
                                  int tier, int initzero, const buffer_t *initdata,
                                  uint64_t meta_version, uint64_t fingerprint)
{
        int ret;
        diskloc_t *locs;

        ret = ymalloc((void **)&locs, sizeof(*locs) * chknum);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        CORE_ANALYSIS_BEGIN(1);

        ret = diskmd_create(pool, locs, chknum, &tier, tier);
        if (unlikely(ret)) {
                GOTO(err_free, ret);
        }

        CORE_ANALYSIS_UPDATE(1, IO_WARN, "replica_srv_create");

        // TODO 若失败，分配的disk bitmap会如何？
        ret = __replica_srv_create____(pool, owner, chkids, locs, chknum,
                                       parent, tier, initzero, initdata, meta_version, fingerprint);
        if (unlikely(ret))
                GOTO(err_free, ret);

        yfree((void **)&locs);

        return 0;
err_free:
        yfree((void **)&locs);
err_ret:
        return ret;
}

static int __drop(void *value, mcache_entry_t *cent, int recycle)
{
        entry_t *ent;

        (void) cent;
        ent = (entry_t *)value;

        if (ent) {
                if (recycle) {
                        DINFO("recycle "CHKID_FORMAT"\n", &ent->chkid);
                }
                __entry_free(ent);
        }

        return 0;
}

static int __get_kv_engine()
{
        int ret;
        char path[MAX_PATH_LEN], buf[MAX_BUF_LEN], *engine;
        
        snprintf(path, MAX_PATH_LEN, "%s/data/node/config/kv_engine", gloconf.home);

        ret = path_validate(path, YLIB_NOTDIR, YLIB_DIRCREATE);
        if (ret)
                GOTO(err_ret, ret);

        ret = _get_value(path, buf, MAX_BUF_LEN);
        if (ret < 0) {
                ret = -ret;
                if (ret == ENOENT) {
                        engine = gloconf.kv_redis ? "redis\n" : "sqlite3\n";
                        DINFO("set kv_engine %s", engine);
                        ret = _set_value(path, engine, strlen(engine) + 1, O_CREAT | O_EXCL);
                        if (ret)
                                GOTO(err_ret, ret);
                } else
                        GOTO(err_ret, ret);
        } else {
                DINFO("get node kv_engine %s\n", buf);

                gloconf.kv_redis = (strncmp(buf, "redis", strlen("redis")) == 0) ? 1 : 0;
        }

        return 0;
err_ret:
        return ret;
}

int replica_srv_init(const char *home, uint64_t _max_chunk)
{
        int ret;
        uint64_t max_chunk, max_chunk_real;

        ANALYSIS_BEGIN(0);

        ret = __get_kv_engine();
        if (ret)
                GOTO(err_ret, ret);
        
        DINFO("init replica srv %s, max %ju, kv_engine %s\n", home, _max_chunk, gloconf.kv_redis ? "redis" : "sqlite3");
        
        ret = ymalloc((void **)&replica_srv, sizeof(*replica_srv));
        if (unlikely(ret))
                GOTO(err_ret, ret);

        strcpy(replica_srv->home, home);

        ANALYSIS_BEGIN(3);

        ret = diskmd_init(home, &max_chunk_real);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ANALYSIS_END(3, 1000 * 100, NULL);

        if (max_chunk_real > _max_chunk) {
                DINFO("init replica srv %s, change %ju -> %ju\n",
                      home, _max_chunk, max_chunk_real);
                max_chunk = max_chunk_real;
        } else {
                max_chunk = _max_chunk;
        }

        ret = sy_rwlock_init(&pool_array.lock, "pool_array.lock");
        if (unlikely(ret))
                GOTO(err_ret, ret);

        vec_init(&pool_array.array);

        ANALYSIS_BEGIN(1);
        ret = mcache_init(&cache_pool, max_chunk, __cmp, __hash, __core_hash,
                          __drop, 0, "replica_srv_pool");
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = mcache_init(&cache_vol, max_chunk * 4, __cmp, __hash, __core_hash,
                          __drop, 1, "replica_srv_vol");
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ANALYSIS_END(1, 1000 * 100, NULL);

        ANALYSIS_BEGIN(2);

        ret = clock_init(max_chunk);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ANALYSIS_END(2, 1000 * 100, NULL);

#if 0
        ret = disk_mt_init();
        if (unlikely(ret))
                GOTO(err_ret, ret);
#endif

        ret = replica_cleanup_init();
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ANALYSIS_END(0, 1000 * 100, NULL);

        replica_srv_preload();

        return 0;
err_ret:
        return ret;
}

void replica_srv_destroy()
{
        diskmd_close();
}

STATIC int __replica_srv_get_clock(entry_t *ent, clockstat_t *_clockstat)
{
        int ret;
        clockstat_t clockstat;

        ret = __replica_srv_get_clock__(&ent->chkid, &clockstat);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        _clockstat->vclock = ent->vclock;
        _clockstat->dirty = clockstat.dirty;
        _clockstat->lost = clockstat.lost;

        return 0;
err_ret:
        return ret;
}

STATIC void __replica_srv_wait_cancel(entry_t *ent)
{
        int ret, count;
        wlist_t *wlist;
        struct list_head *pos, *n;

        ret = sy_spin_lock(&ent->wq.lock);
        if (unlikely(ret))
                UNIMPLEMENTED(__DUMP__);
        
        count = 0;
        list_for_each_safe(pos, n, &ent->wq.wlist) {
                wlist = (void *)pos;

                if (wlist->op == __OP_READ_WAIT__ || wlist->op == __OP_WRITE_WAIT__) {
                        list_del(&wlist->hook);
                        schedule_resume(&wlist->task, EAGAIN, NULL);
                        count++;
                }
        }

        if (count > 10) {
                DWARN("chunk "CHKID_FORMAT" wait %u\n", CHKID_ARG(&ent->chkid), count);
        }

        sy_spin_unlock(&ent->wq.lock);
}

STATIC int __replica_srv_connect_check(entry_t *ent, const nid_t *owner,
                                       const lease_token_t *token, uint32_t magic, int force)
{
        int ret;

        if (ng.master_magic != token->master) {
                DERROR("chunk "CHKID_FORMAT" got old master 0x%x -> 0x%x\n",
                       CHKID_ARG(&ent->chkid), ng.master_magic, token->master);
                ret = ESTALE;
                GOTO(err_ret, ret);
        }
        
        //if (ent->token.master != master || force) {
        if (ent->token.master != token->master) {//master reset, all lease is recalled
                DBUG("chunk "CHKID_FORMAT" got new master 0x%x -> 0x%x, lease %ju -> %ju, force %u, force connect\n",
                     CHKID_ARG(&ent->chkid), ent->token.master, token->master, ent->token.seq, token->seq, force);
        } else {
                if (ent->token.seq > token->seq) {
                        DERROR("chunk "CHKID_FORMAT" got old lease %ju -> %ju, master 0x%x, just reject\n",
                               CHKID_ARG(&ent->chkid), ent->token.seq, token->seq, token->master);
                        YASSERT(0);
                        ret = EPERM;
                        GOTO(err_ret, ret);
                }

#if 1
                if (nid_cmp(&ent->owner, owner) && netable_connected(&ent->owner)) {
                        YASSERT(force == 0);
                        if (token->seq > ent->token.seq) {
                                //if (force) {
                                // TODO
                                DBUG("force connect "CHKID_FORMAT" owner %s, new %s\n",
                                     CHKID_ARG(&ent->chkid),
                                     network_rname(&ent->owner),
                                     network_rname(owner));
                        } else {
                                // TODO stop节点，再重启，会进入此分支
                                ret = EPERM;
                                DWARN("connect "CHKID_FORMAT" owner %s -> %s, token %ju -> %ju\n",
                                      CHKID_ARG(&ent->chkid), network_rname(&ent->owner),
                                      network_rname(owner), ent->token.seq, token->seq);
                                goto err_ret;
                        }
                }
#endif
                if (unlikely(ent->magic == magic)) {
                        ret = EKEYEXPIRED;
                        GOTO(err_ret, ret);
                }
        }

#if ENABLE_CHUNK_PARALLEL
        if (ent->pio.writing) {
                DWARN("chunk "CHKID_FORMAT" writing %u, owner %s\n",
                      CHKID_ARG(&ent->chkid), ent->pio.writing, network_rname(owner));
                ret = EAGAIN;
                GOTO(err_ret, ret);
        }
#endif

        return 0;
err_ret:
        return ret;
}

STATIC int __replica_srv_connect(va_list ap)
{
        int ret, online;
        mcache_entry_t *cent;
        entry_t *ent;
        const nid_t *owner = va_arg(ap, nid_t *);
        const chkid_t *chkid = va_arg(ap, chkid_t *);
        const chkid_t *parent = va_arg(ap, chkid_t *);
        const lease_token_t *token = va_arg(ap, lease_token_t *);
        uint32_t magic = va_arg(ap, uint32_t);
        clockstat_t *clockstat = va_arg(ap, clockstat_t *);
        int force = va_arg(ap, int);

        va_end(ap);

        ANALYSIS_BEGIN(0);

        ret = replica_srv_get(chkid, &cent);
        if (unlikely(ret)) {
                /* for No data available, very dangerous */
                if (ret == ENOENT) {
                        DWARN("chunk "CHKID_FORMAT" owner %s not found!\n",
                              CHKID_ARG(chkid), network_rname(owner));
                }

                GOTO(err_ret, ret);
        }

        ret = mcache_wrlock(cent);
        if (unlikely(ret))
                GOTO(err_release, ret);

        ent = cent->value;

        ret = __replica_srv_connect_check(ent, owner, token, magic, force);
        if (unlikely(ret))
                GOTO(err_lock, ret);
        
        ret = diskmd_online(ent->loc.diskid, &online);
        if (unlikely(ret))
                GOTO(err_lock, ret);

        // 先检查disk状态，很重要！！！
        if (unlikely(!online)) {
                DBUG("connect "CHKID_FORMAT" disk %d online %d\n",
                      CHKID_ARG(&ent->chkid),
                      ent->loc.diskid,
                      online);
                clock_remove(chkid);//remove clock for safe
                ret = ENODEV;
                GOTO(err_lock, ret);
        }

        
        ret = __replica_srv_get_clock(ent, clockstat);
        if (unlikely(ret))
                GOTO(err_lock, ret);

        if (chkid_cmp(parent, &ent->parent)) {
                ret = disk_maping->setparent(chkid, parent);
                if (unlikely(ret))
                        GOTO(err_lock, ret);

                ent->parent = *parent;
        }

        //memset(&ent->owner, 0x0, sizeof(ent->owner));

        __replica_srv_wait_cancel(ent);

        ent->owner = *owner;
        ent->magic = magic;
        ent->token = *token;
        
        mcache_unlock(cent);
        __replica_srv_release(cent);

        ANALYSIS_END(0, 1000 * 100, NULL);

#if ENABLE_CHUNK_DEBUG
        DINFO("connect "CHKID_FORMAT" owner %s clock %llu/%d, token (0x%x, %ju), magic %x, force: %d\n",
              CHKID_ARG(&ent->chkid), network_rname(owner),
              (LLU)clockstat->vclock.clock, clockstat->dirty,
              token->master, token->seq, magic, force);
#else
        DBUG("connect "CHKID_FORMAT" owner %s clock %llu/%d, token (0x%x, %ju), magic %x, force: %d\n",
             CHKID_ARG(&ent->chkid), network_rname(owner),
             (LLU)clockstat->vclock.clock, clockstat->dirty,
             token->master, token->seq, magic, force);
#endif

        return 0;
err_lock:
        mcache_unlock(cent);
err_release:
        __replica_srv_release(cent);
err_ret:
        ANALYSIS_END(0, 1000 * 100, NULL);
        
        return ret;
}

int replica_srv_connect(const nid_t *owner, const chkid_t *chkid,
                        const chkid_t *parent, const lease_token_t *token, uint32_t magic,
                        clockstat_t *clockstat, int force)
{
        int ret;

        ret = core_request(core_hash(chkid), -1, "replica_connect", __replica_srv_connect,
                           owner, chkid, parent, token, magic, clockstat, force);
        if (unlikely(ret))
                goto err_ret;

        return 0;
err_ret:
        return ret;
}

STATIC int __replica_srv_getclock(va_list ap)
{
        int ret;
        mcache_entry_t *cent;
        entry_t *ent;
        const nid_t *owner = va_arg(ap, nid_t *);
        const chkid_t *chkid = va_arg(ap, chkid_t *);
        clockstat_t *clockstat = va_arg(ap, clockstat_t *);

        va_end(ap);

        ret = replica_srv_get(chkid, &cent);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        ret = mcache_rdlock(cent);
        if (unlikely(ret))
                GOTO(err_release, ret);

        ent = cent->value;

#if 0
        if (nid_cmp(&ent->owner, owner)) {
                ret = EPERM;
                DWARN("getclock "CHKID_FORMAT" owner %s, new %s\n",
                      CHKID_ARG(&ent->chkid), network_rname(&ent->owner),
                      network_rname(owner));
                GOTO(err_lock, ret);
        }
#endif

        ret = __replica_srv_get_clock(ent, clockstat);
        if (unlikely(ret))
                GOTO(err_lock, ret);

#if ENABLE_CHUNK_DBUG
        DINFO("getclock "CHKID_FORMAT" owner %s clock %llu\n",
              CHKID_ARG(&ent->chkid), network_rname(owner), (LLU)clockstat->vclock.clock);
#else
        DBUG("getclock "CHKID_FORMAT" owner %s clock %llu\n",
              CHKID_ARG(&ent->chkid), network_rname(owner), (LLU)clockstat->vclock.clock);
#endif

        mcache_unlock(cent);
        __replica_srv_release(cent);

        return 0;
err_lock:
        mcache_unlock(cent);
err_release:
        __replica_srv_release(cent);
err_ret:
        return ret;
}

int replica_srv_getclock(const nid_t *owner, const chkid_t *chkid, clockstat_t *clockstat)
{
        int ret;

        ret = core_request(core_hash(chkid), -1, "getclock", __replica_srv_getclock,
                           owner, chkid, clockstat);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

#if 0
STATIC int __replica_srv_load_empty(const nid_t *owner, const chkid_t *chkid,
                                    mcache_entry_t **_cent, int initzero, const buffer_t *initdata,
                                    uint64_t meta_version, int force)
{
        int ret, empty;
        mcache_entry_t *cent;
        entry_t *ent;

        ret = __replica_srv_isempty(chkid, &empty);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        if (empty == 0 && force == 0) {
                ret = EEXIST;
                GOTO(err_ret, ret);
        }

        ret = replica_srv_get(chkid, &cent);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = mcache_wrlock(cent);
        if (unlikely(ret))
                GOTO(err_release, ret);

        YASSERT(cent->value);
        ent = cent->value;
        ent->owner = *owner;

        if (initzero || initdata) {
                ret = diskmd_create_init(chkid, &ent->loc, initzero, initdata);
                if (unlikely(ret))
                        GOTO(err_lock, ret);
        }

        ret = disk_maping->setmetaversion(chkid, meta_version);
        if (unlikely(ret))
                GOTO(err_lock, ret);

        mcache_unlock(cent);
        *_cent = cent;

        return 0;
err_lock:
        mcache_unlock(cent);
err_release:
        __replica_srv_release(cent);
err_ret:
        return ret;
}
#endif

#define ENABLE_REPLICA_LOOKUP_MULTITASK 1
#define REPLICA_LOOKUP_MULTITASK        10

/**
 * 记录每个task的运行结果.
 *
 * 我们关注multitask里的所有任务的相关状态信息
 */
typedef struct {
        // chkid_t chkid;
        int retval;
        int exists;
} __replica_lookup_entry_t;

typedef struct {
        co_cond_t *cond;
        int *task_count;

        // 输入参数
        const chkid_t *chkid;

        __replica_lookup_entry_t *array;
        int array_idx;         ///< array数组内索引
} __replica_lookup_ctx_t;

static inline int __replica_exists(const chkid_t *chkid, int *exists)
{
        int ret;
        mcache_entry_t *cent;

        *exists = 0;

        ret = replica_srv_get(chkid, &cent);
        if (unlikely(ret)) {
                if (ret == ENOENT) {
                        // nothing to do
                        *exists = 0;
                } else if (ret == ENODEV) {
                        *exists = 1;
                } else {
                        GOTO(err_ret, ret);
                }
        } else {
                *exists = 1;
                replica_srv_release(cent);
        }

        return 0;
err_ret:
        return ret;
}

void __replica_exists_wrapper(void *arg)
{
        int ret, exists;
        __replica_lookup_ctx_t *ctx = arg;
        __replica_lookup_entry_t *entry = &ctx->array[ctx->array_idx];

        ret = __replica_exists(ctx->chkid, &exists);
        {
                entry->retval = ret;
                entry->exists = exists;
        }
        if (unlikely(ret))
                GOTO(err_ret, ret);

        // @multitask_return
        (*ctx->task_count)--;
        co_cond_broadcast(ctx->cond, 0);
        yfree((void **)&ctx);
        return;
err_ret:
        (*ctx->task_count)--;
        co_cond_broadcast(ctx->cond, ret);
        yfree((void **)&ctx);
}

int __replica_srv_create_filter1_multi__(const chkid_t *chkids, int chknum,
                                         __replica_lookup_entry_t **_array)
{
        int ret, i;
        int task_count = 0, err_count = 0;
        co_cond_t cond;
        __replica_lookup_ctx_t *ctx;

        *_array = NULL;

        __replica_lookup_entry_t *array;

        ret = ymalloc((void **)&array, sizeof(__replica_lookup_entry_t) * chknum);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        memset(array, 0x0, sizeof(__replica_lookup_entry_t) * chknum);

        co_cond_init(&cond);

        for (i = 0; i < chknum; i++) {
                YASSERT(chkids[i].id);

                while (task_count > REPLICA_LOOKUP_MULTITASK) {
                        ret = co_cond_wait2(&cond, __FUNCTION__);
                        if (unlikely(ret)) {
                                // TODO 任何一个发生错误，退出循环
                                err_count++;
                                GOTO(err_cond, ret);
                        }
                }

                ret = ymalloc((void **)&ctx, sizeof(__replica_lookup_ctx_t));
                if (unlikely(ret))
                        GOTO(err_cond, ret);

                memset(ctx, 0x0, sizeof(__replica_lookup_ctx_t));

                ctx->cond = &cond;
                ctx->task_count = &task_count;

                ctx->chkid = &chkids[i];
                ctx->array_idx = i;
                ctx->array = array;

                schedule_task_new(__FUNCTION__, __replica_exists_wrapper, ctx, -1);
                task_count++;
        }

        while (task_count > 0) {
                ret = co_cond_wait2(&cond, __FUNCTION__);
                if (unlikely(ret)) {
                        err_count++;
                        GOTO(err_cond, ret);
                }
        }

        if (err_count > 0) {
                ret = EAGAIN;
                GOTO(err_free, ret);
        }

        *_array = array;
        return 0;
err_cond:
        while (task_count > 0) {
                co_cond_wait2(&cond, __FUNCTION__);
        }
err_free:
        yfree((void **)&array);
err_ret:
        return ret;
}

int __replica_srv_create_filter1_multi(const chkid_t *chkids, int chknum,
                                       chkid_t **_array, int *_array_len)
{
        int ret, i, array_len = 0;

        *_array = NULL;
        *_array_len = 0;

        __replica_lookup_entry_t *array, *entry;

        // 检查每个chunk是否存在, array的长度是chknum
        ret = __replica_srv_create_filter1_multi__(chkids, chknum, &array);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        // 获取不存在的chunk数量
        array_len = 0;
        for (i=0; i < chknum; i++) {
                entry = &array[i];
                ret = entry->retval;
                if (unlikely(ret)) {
                        GOTO(err_free, ret);
                }

                if (!entry->exists) {
                        array_len++;
                }
        }

        if (array_len) {
                // 返回不存在的chunk
                chkid_t *array2;

                ret = ymalloc((void **)&array2, sizeof(chkid_t) * array_len);
                if (unlikely(ret)) {
                        GOTO(err_free, ret);
                }

                array_len = 0;
                for (i=0; i < chknum; i++) {
                        entry = &array[i];
                        if (entry->retval == 0 && !entry->exists) {
                                array2[array_len++] = chkids[i];
                        }
                }

                *_array = array2;
                *_array_len = array_len;
        }

        yfree((void **)&array);
        return 0;
err_free:
        yfree((void **)&array);
err_ret:
        return ret;
}





int __replica_srv_create_filter2_multi(const chkid_t *chkids, int chknum,
                                       chkid_t **_array, int *_array_len)
{
        int ret, i;
        int *exists;

        *_array = NULL;
        *_array_len = 0;

        ret = ymalloc((void **)&exists, sizeof(int) * chknum);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        memset(exists, 0x0, sizeof(int) * chknum);

        ret = disk_maping->exist(chkids, chknum, exists);
        if (unlikely(ret))
                GOTO(err_free, ret);

        int count = 0;
        for (i=0; i < chknum; i++) {
                // 关注的是不存在的项
                if (!exists[i]) {
                        count ++;
                }
        }

        if (count > 0) {
                chkid_t *array2;

                ret = ymalloc((void **)&array2, sizeof(chkid_t) * count);
                if (unlikely(ret)) {
                        GOTO(err_free, ret);
                }

                count = 0;
                for (i=0; i < chknum; i++) {
                        if (!exists[i]) {
                                array2[count++] = chkids[i];
                        }
                }

                *_array = array2;
                *_array_len = count;
        }

        yfree((void **)&exists);
        return 0;
err_free:
        yfree((void **)&exists);
err_ret:
        return ret;
}

int __replica_srv_create_filter1(const chkid_t *chkids, int chknum,
                                 chkid_t **_array, int *_array_len)
{
        int ret, i, exists;
        chkid_t *array;
        int array_len;

        *_array = NULL;
        *_array_len = 0;

        ret = ymalloc((void **)&array, sizeof(chkid_t) * chknum);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        memset(array, 0, sizeof(chkid_t) * chknum);

        array_len = 0;

        for (i = 0; i < chknum; i++) {
                YASSERT(chkids[i].id);

                ret = __replica_exists(&chkids[i], &exists);
                if (unlikely(ret)) {
                        // ENOENT返回0，表示能获取准确结果
                        // 任何一个发生错误，都会退出循环
                        GOTO(err_free, ret);
                }

                if (!exists) {
                        array[array_len++] = chkids[i];
                }
        }

        *_array = array;
        *_array_len = array_len;

        return 0;
err_free:
        yfree((void **)&array);
err_ret:
        return ret;
}

STATIC int __replica_srv_create(va_list ap)
{
        const char *pool = va_arg(ap, const char *);
        const nid_t *owner = va_arg(ap, nid_t *);
        const chkid_t *chkids = va_arg(ap, chkid_t *);
        int chknum = va_arg(ap, int);
        const fileid_t *parent = va_arg(ap, fileid_t *);
        int tier = va_arg(ap, int);
        int initzero = va_arg(ap, int);
        const buffer_t *initdata = va_arg(ap, buffer_t *);
        uint64_t meta_version = va_arg(ap, uint64_t);
        uint64_t fingerprint = va_arg(ap, uint64_t);
        int force = va_arg(ap, int);

        va_end(ap);

        (void) force;

        int ret, array_len = 0;
        chkid_t *array = NULL;

        ANALYSIS_BEGIN(0);

#if ENABLE_REPLICA_LOOKUP_MULTITASK
        // ret = __replica_srv_create_filter1_multi(chkids, chknum, &array, &array_len);
        ret = __replica_srv_create_filter2_multi(chkids, chknum, &array, &array_len);
#else
        ret = __replica_srv_create_filter1(chkids, chknum, &array, &array_len);
#endif
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ANALYSIS_QUEUE(0, 1000 * 100, "replica_srv_create_check");

        DBUG("chknum %d array len %d\n", chknum, array_len);

        if (array_len) {
                ret = __replica_srv_create__(pool, owner, array, array_len, parent, tier, initzero,
                                             initdata, meta_version, fingerprint);
                if (unlikely(ret)) {
                        if (ret == EEXIST) {
                                UNIMPLEMENTED(__DUMP__);
                        } else
                                GOTO(err_free, ret);
                }

                yfree((void **)&array);
        }

        return 0;
err_free:
        if (array) {
                yfree((void **)&array);
        }
err_ret:
        return ret;
}

int replica_srv_create(const char *pool, const nid_t *owner, const chkid_t *chkids, int chknum,
                       const fileid_t *parent, int tier, int initzero,
                       const buffer_t *initdata, uint64_t meta_version, int force)
{
        int ret;
        uint64_t fingerprint;

        DBUG("create chunk "CHKID_FORMAT" @ %s\n",
              CHKID_ARG(&chkids[0]), network_rname(owner));

        ANALYSIS_BEGIN(0);

        YASSERT(parent->id);

        ret = sequence_get(&fingerprint);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = core_request(core_hash(&chkids[0]), -1, "replica_create",
                           __replica_srv_create, pool, owner, chkids, chknum,
                           parent, tier, initzero, initdata, meta_version, fingerprint, force);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ANALYSIS_QUEUE(0, IO_WARN, "replica_srv_create");

        DBUG("create chunk "CHKID_FORMAT" @ %s\n",
              CHKID_ARG(&chkids[0]), network_rname(owner));

        return 0;
err_ret:
        return ret;
}

int replica_srv_create_with_fingerprint(const char *pool, const nid_t *owner,
                                        const chkid_t *chkid, int chknum, const fileid_t *parent, int tier, int initzero,
                                        const buffer_t *initdata, uint64_t meta_version, uint64_t fingerprint, int force)
{
        int ret;

        YASSERT(chkid->id);

        ret = core_request(core_hash(chkid), -1, "replica_create", __replica_srv_create,
                           pool, owner, chkid, chknum, parent, tier, initzero, initdata, meta_version, fingerprint, force);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

STATIC int __replica_srv_unlink__(const chkid_t *chkid, entry_t *ent, uint64_t meta_version)
{
        int ret;
        diskloc_t loc;

        loc = ent->loc;

        ret = __replica_srv_wqcheck(ent);
        if (unlikely(ret))
                GOTO(err_ret, ret);

#if ENABLE_CHUNK_DEBUG
        DINFO("unlink "CHKID_FORMAT" metaversion %ld\n", CHKID_ARG(chkid),
              meta_version);
#else
        DBUG("unlink "CHKID_FORMAT" metaversion %ld\n", CHKID_ARG(chkid),
              meta_version);
#endif

        ret = diskmd_unlink(chkid, &loc, meta_version);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

STATIC int __replica_srv_unlink(va_list ap)
{
        int ret, retry = 0;
        mcache_entry_t *cent;
        entry_t *ent;
        const chkid_t *chkid = va_arg(ap, chkid_t *);
        uint64_t meta_version = va_arg(ap, uint64_t);

        va_end(ap);

        ret = replica_srv_get(chkid, &cent);
        if (unlikely(ret)) {
                if (ret == ENOENT) {
                        goto out;
                } else
                        GOTO(err_ret, ret);
        }

retry:
        ret = mcache_wrlock(cent);
        if (unlikely(ret))
                GOTO(err_release, ret);

        ent = cent->value;


        ret = __replica_srv_unlink__(chkid, ent, meta_version);
        if (unlikely(ret)) {
                if (ret == EBUSY) {
                        mcache_unlock(cent);

                        retry++;
                        if (retry > gloconf.lease_timeout) {
                                DERROR("unlink "CHKID_FORMAT" fail, restart for safe\n", CHKID_ARG(chkid));
                                EXIT(EAGAIN);
                        }

                        DERROR("unlink "CHKID_FORMAT" retry %u\n", CHKID_ARG(chkid), retry);
                        schedule_sleep("replica_unlink", 1000 * 1000);
                        goto retry;
                } else
                        GOTO(err_lock, ret);
        }

        mcache_drop_nolock(cent);

        mcache_unlock(cent);
        __replica_srv_release(cent);

        DBUG("cleanup "CHKID_FORMAT"\n", CHKID_ARG(chkid));

out:
        return 0;
err_lock:
        mcache_unlock(cent);
err_release:
        __replica_srv_release(cent);
err_ret:
        return ret;
}

int replica_srv_unlink(const chkid_t *chkid, uint64_t meta_version)
{
        int ret;

        YASSERT(!chkid_isnull(chkid));
        ret = core_request(core_hash(chkid), -1, "unlink", __replica_srv_unlink,
                           chkid, meta_version);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

STATIC int __replica_srv_getparent(va_list ap)
{
        int ret;
        mcache_entry_t *cent;
        entry_t *ent;
        const chkid_t *chkid = va_arg(ap, chkid_t *);
        chkid_t *parent = va_arg(ap, chkid_t *);
        char *pool = va_arg(ap, char *);

        va_end(ap);

        ret = replica_srv_get(chkid, &cent);
        if (unlikely(ret))
                goto err_ret;

        ret = mcache_rdlock(cent);
        if (unlikely(ret))
                GOTO(err_release, ret);

        ent = cent->value;
        if (parent) {
                *parent = ent->parent;
        } if (pool) {
                strcpy(pool, ent->pool);
        }

        YASSERT(ent->parent.id);
#if ENABLE_CHUNK_DEBUG
        DINFO("chunk "CHKID_FORMAT" pool %s parent "CHKID_FORMAT"\n",
              CHKID_ARG(chkid), ent->pool, CHKID_ARG(&ent->parent));
#else
        DBUG("chunk "CHKID_FORMAT" pool %s parent "CHKID_FORMAT"\n",
              CHKID_ARG(chkid), ent->pool, CHKID_ARG(&ent->parent));
#endif

        mcache_unlock(cent);
        __replica_srv_release(cent);

        return 0;
err_release:
        __replica_srv_release(cent);
err_ret:
        return ret;
}

int replica_srv_getparent(const chkid_t *chkid, chkid_t *parent, char *pool)
{
        int ret;

        ret = core_request(core_hash(chkid), -1, "getparent", __replica_srv_getparent,
                           chkid, parent, pool);
        if (unlikely(ret))
                goto err_ret;

        return 0;
err_ret:
        return ret;
}

STATIC int __replica_srv_setparent(va_list ap)
{
        int ret;
        mcache_entry_t *cent;
        entry_t *ent;
        const chkid_t *chkid = va_arg(ap, chkid_t *);
        const chkid_t *parent = va_arg(ap, chkid_t *);

        va_end(ap);

        ret = replica_srv_get(chkid, &cent);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = mcache_wrlock(cent);
        if (unlikely(ret))
                GOTO(err_release, ret);

        ent = cent->value;

        ret = disk_maping->setparent(chkid, parent);
        if (unlikely(ret))
                GOTO(err_lock, ret);

        ent->parent = *parent;

        mcache_unlock(cent);
        __replica_srv_release(cent);

        return 0;
err_lock:
        mcache_unlock(cent);
err_release:
        __replica_srv_release(cent);
err_ret:
        return ret;
}

int replica_srv_setparent(const chkid_t *chkid, const chkid_t *parent)
{
        int ret;

        ret = core_request(core_hash(chkid), -1, "setparent", __replica_srv_setparent,
                           chkid, parent);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

static int __replica_srv_setclock(va_list ap)
{
        int ret;
        mcache_entry_t *cent;
        entry_t *ent;
        const nid_t *owner = va_arg(ap, nid_t *);
        const chkid_t *chkid = va_arg(ap, chkid_t *);
        const clockstat_t *clockstat = va_arg(ap, clockstat_t *);

        va_end(ap);

        ret = replica_srv_get(chkid, &cent);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        ret = mcache_wrlock(cent);
        if (unlikely(ret))
                GOTO(err_release, ret);

        ent = cent->value;

#if 1
        if (nid_cmp(&ent->owner, owner)) {
                ret = EPERM;
                DWARN("setclock "CHKID_FORMAT" owner %s, new %s\n",
                      CHKID_ARG(&ent->chkid), network_rname(&ent->owner),
                      network_rname(owner));
                GOTO(err_lock, ret);
        }
#endif

        ret = clock_set(&ent->chkid, &clockstat->vclock, clockstat->dirty);
        if (unlikely(ret)) {
                GOTO(err_lock, ret);
        }

        ent->vclock = clockstat->vclock;

#if ENABLE_CHUNK_DEBUG
        DINFO("setclock "CHKID_FORMAT" owner %s "VCLOCK_FORMAT"\n",
              CHKID_ARG(&ent->chkid), network_rname(owner), VCLOCK_ARG(&clockstat->vclock));
#else
        DBUG("setclock "CHKID_FORMAT" owner %s "VCLOCK_FORMAT"\n",
              CHKID_ARG(&ent->chkid), network_rname(owner), VCLOCK_ARG(&clockstat->vclock));
#endif

        mcache_unlock(cent);
        __replica_srv_release(cent);

        return 0;
err_lock:
        mcache_unlock(cent);
err_release:
        __replica_srv_release(cent);
err_ret:
        return ret;
}

int replica_srv_setclock(const nid_t *owner, const chkid_t *chkid, const clockstat_t *clockstat)
{
        int ret;

        ret = core_request(core_hash(chkid), -1, "setclock", __replica_srv_setclock,
                           owner, chkid, clockstat);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

STATIC int __replica_srv_load(va_list ap)
{
        int ret;
        mcache_entry_t *cent;
        entry_t *ent;

        const chkid_t *chkid = va_arg(ap, chkid_t *);
        diskloc_t *loc = va_arg(ap, diskloc_t *);
        chkid_t *parent = va_arg(ap, chkid_t *);

        va_end(ap);

        ret = replica_srv_get(chkid, &cent);
        if (unlikely(ret))
                goto err_ret;

        ret = mcache_rdlock(cent);
        if (unlikely(ret))
                GOTO(err_release, ret);

        ent = cent->value;
        *loc = ent->loc;
        *parent = ent->parent;

        mcache_unlock(cent);
        __replica_srv_release(cent);

        return 0;
err_release:
        __replica_srv_release(cent);
err_ret:
        return ret;
}

int replica_srv_getinfo(const chkid_t *chkid, diskloc_t *loc, chkid_t *parent,
                uint64_t *fingerprint, int *wbdisk)
{
        int ret;

        ret = core_request(core_hash(chkid), -1, "getparent", __replica_srv_load,
                           chkid, loc, parent);
        if (unlikely(ret))
                goto err_ret;

        *fingerprint = -1;
        *wbdisk = -1;
        
        return 0;
err_ret:
        return ret;
}

int replica_srv_get(const chkid_t *id, mcache_entry_t **_cent)
{
        if (likely(chkid_isvol(id))) {
                return __replica_srv_get(cache_vol, id, _cent);
        } else {
                return __replica_srv_get(cache_pool, id, _cent);
        }
}

int replica_srv_release(mcache_entry_t *cent)
{
        __replica_srv_release(cent);
        return 0;
}



int __replica_srv_cast(va_list ap)
{
        int ret, tier_new, tier_old;
        mcache_entry_t *cent;
        entry_t *ent;
        diskloc_t newloc;
        buffer_t buf;

        const chkid_t *chkid = va_arg(ap, chkid_t *);
        uint32_t from = va_arg(ap, uint32_t);

        va_end(ap);

        ret = replica_srv_get(chkid, &cent);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = mcache_wrlock(cent);
        if (unlikely(ret))
                GOTO(err_release, ret);

        ent = cent->value;
        ret = __replica_srv_wqcheck(ent);
        if (unlikely(ret))
                GOTO(err_lock, ret);

        ret = diskmd_create_direct(ent->pool, &newloc, 1, &tier_new);
        if (unlikely(ret))
                GOTO(err_lock, ret);

        mbuffer_init(&buf, LICH_CHUNK_SPLIT);
        YASSERT(newloc.diskid != from);
#if 0
        ret = disk_read(&ent->loc, &buf, 0);
#else
        ret = diskmd_aio_read(chkid, &ent->loc, &buf, 0, 0);
#endif

        if (unlikely(ret))
                GOTO(err_free, ret);

#if RAMDISK_ENABLE
        buffer_t cmp;

        mbuffer_init(&cmp, 0);
        ret = ramdisk_read(chkid, &cmp, buf->len, 0);
        if (unlikely(ret))
                GOTO(err_free1, ret);

        DWARN("replica read "CHKID_FORMAT" offset:%d size:%d loc disk %d idx %d\n",
                        CHKID_ARG(chkid), 0, buf->len, loc->diskid, loc->idx);

        if (mbuffer_compare(&cmp, buf)) {
                DWARN("read raw "CHKID_FORMAT" size %llu(%d,%d) off %llu not match with ramdisk\n",
                                CHKID_ARG(&io->id), (LLU)io->size, buf->len, cmp.len, (LLU)io->offset);
        }

        mbuffer_free(&cmp);
#endif
#if 0
        ret = disk_write(&newloc, &buf, 0);
#else
        ret = diskmd_aio_write(chkid, &newloc, &buf, 0, 0);
#endif
        if (unlikely(ret)) {
                GOTO(err_free, ret);
        }

#if RAMDISK_ENABLE
        ret = ramdisk_write(chkid, buf, buf->len, 0);
        if (unlikely(ret)) {
                GOTO(err_free, ret);
        }

        DWARN("replica write "CHKID_FORMAT" offset:%d size:%d loc disk %d idx %d\n",
                        CHKID_ARG(chkid), 0, buf->len, newloc.diskid, nowloc.idx);
#endif

        ret = disk_maping->setloc(chkid, &newloc);
        if (unlikely(ret))
                GOTO(err_free, ret);

        DINFO("delete "CHKID_FORMAT" loc "LOC_FORMAT"\n", CHKID_ARG(chkid), LOC_ARG(&ent->loc))

        ret = diskmd_delete(&ent->loc);
        if (unlikely(ret)) {
                if (ret == ENODEV) {
                } else {
                        UNIMPLEMENTED(__DUMP__);
                }
        }

        mbuffer_free(&buf);

        ret = diskmd_gettier(ent->loc.diskid, &tier_old);
        if (unlikely(ret))
                GOTO(err_lock, ret);

        DINFO("move "CHKID_FORMAT" from %d to %d\n", CHKID_ARG(chkid),
              ent->loc.diskid, newloc.diskid);
        ent->loc = newloc;

        mcache_unlock(cent);
        replica_srv_release(cent);

        return 0;
#if RAMDISK_ENABLE
err_free1:
        mbuffer_free(&cmp);
#endif
err_free:
        mbuffer_free(&buf);
        diskmd_delete(&newloc);
err_lock:
        mcache_unlock(cent);
err_release:
        replica_srv_release(cent);
err_ret:
        return ret;
}

/**
 * 把from盘上的chkid数据块移动到节点内的其它磁盘
 *
 * @note from，或为Cache disk，或为normal disk，处理方式不同
 *
 * @param chkid
 * @param from
 * @return
 */
int replica_srv_cast(const chkid_t *chkid, uint32_t from)
{
        int ret;

        ret = core_request(core_hash(chkid), -1, "replica_diskonline", __replica_srv_cast,
                           chkid, from);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

STATIC int __replica_srv_diskonline(va_list ap)
{
        int ret;
        mcache_entry_t *cent;
        entry_t *ent;
        const chkid_t *chkid = va_arg(ap, chkid_t *);
        int *online = va_arg(ap, int *);
        int disk_online = 1;

        va_end(ap);

        ret = replica_srv_get(chkid, &cent);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = mcache_rdlock(cent);
        if (unlikely(ret))
                GOTO(err_release, ret);

        ent = cent->value;

#if 0
        ret = __replica_srv_wqcheck(ent);
        if (unlikely(ret))
                GOTO(err_lock, ret);
#endif

        ret = diskmd_online(ent->loc.diskid, &disk_online);
        if (ret)
                GOTO(err_lock, ret);

        *online = disk_online;

        mcache_unlock(cent);
        __replica_srv_release(cent);

        return 0;
err_lock:
        mcache_unlock(cent);
err_release:
        __replica_srv_release(cent);
err_ret:
        return ret;
}

int replica_srv_diskonline(const chkid_t *chkid, int *online)
{
        int ret;

        ret = core_request(core_hash(chkid), -1, "replica_diskonline", __replica_srv_diskonline,
                           chkid, online);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

static void __replica_srv_dump_memory(void *arg, void *ent)
{
        (void) ent;

        uint64_t *total = arg;
        (*total) += 1;
}

/**
 * 获取内存使用量
 *
 * @return
 */
int replica_srv_dump_memory(uint64_t *memory)
{
        uint64_t pool_ent_count = 0, vol_ent_count = 0;

        *memory = 0;

        mcache_iterator(cache_pool, __replica_srv_dump_memory, &pool_ent_count);
        mcache_iterator(cache_vol, __replica_srv_dump_memory, &vol_ent_count);

        *memory = (pool_ent_count + vol_ent_count) * (sizeof(mcache_entry_t) + sizeof(replica_srv_entry_t));

        return 0;
}

STATIC int __replica_srv_preload____(mcache_t *cache, const chkid_t *chkid,
                                           const char *pool, const diskloc_t *loc,
                                           const chkid_t *parent, uint64_t meta_version)
{
        int ret;
        entry_t *ent;
        mcache_entry_t *cent;
        clockstat_t clockstat;

        (void) meta_version;

        DBUG("preload "CHKID_FORMAT"\n", CHKID_ARG(chkid));
        
        ret = __replica_srv_get_clock__(chkid, &clockstat);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }
        
        ret = __entry_init(&ent, chkid, &clockstat.vclock, pool, loc, parent);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = mcache_insert_lock(cache, chkid, &cent);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        cent->value = ent;

        mcache_insert_unlock(cache, chkid, cent);

        return 0;
err_ret:
        return ret;
}

STATIC int __replica_srv_preload__(va_list ap)
{
        const chkid_t *chkid = va_arg(ap, chkid_t *);
        const char *pool = va_arg(ap, char *);
        const diskloc_t *loc = va_arg(ap, diskloc_t *);
        const chkid_t *parent = va_arg(ap, chkid_t *);
        uint64_t meta_version = va_arg(ap, uint64_t);
        
        va_end(ap);

        if (likely(chkid_isvol(chkid))) {
                return __replica_srv_preload____(cache_vol, chkid, pool, loc, parent, meta_version);
        } else {
                return __replica_srv_preload____(cache_pool, chkid, pool, loc, parent, meta_version);
        }
}

static int __replica_srv_preload(const chkid_t *chkid, const char *pool,
                                        const diskloc_t *loc, const chkid_t *parent,
                                        const uint64_t meta_version, void *ctx)
{
        (void) ctx;
        
        DBUG("preload "CHKID_FORMAT"\n", CHKID_ARG(chkid));
        
        core_request(core_hash(chkid), -1, "replica_preload",
                     __replica_srv_preload__, chkid,
                     pool, loc, parent, meta_version);

        return 0;
}

void replica_srv_preload()
{
#if 0
        return;
#endif

        if (gloconf.kv_redis) {
                ANALYSIS_BEGIN(0);

                disk_maping->iterator_new( __replica_srv_preload, NULL,
                                           DM_FLAG_MD | DM_FLAG_RAW | DM_FLAG_MT);

                ANALYSIS_END(0, 1000 * 100, NULL);
        } else {
                DERROR("sqlite3 preload not support\n");
        }
}
